package com.example.demo.stream.source;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.example.demo.env.SpringContextHolder;
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.util.List;

/**
 * 加载MySql数据源内容
 *
 * @author: wbx
 */
public class DbSource<Out> extends RichSourceFunction<Out> {

    /**
     *  传入的wrapper查询对象
     */
    private LambdaQueryWrapper<Out> ew;

    /**
     * 调用的mapper接口对象
     */
    private Class<? extends BaseMapper<Out>> mapperClass;

    public DbSource(Class<? extends BaseMapper<Out>> mapperClass, LambdaQueryWrapper<Out> queryWrapper) {
        this.mapperClass = mapperClass;
        this.ew = queryWrapper;
    }


    /**
     *  查询数据库 封装到数据源中
     */
    @Override
    public void run(SourceContext<Out> ctx) {
        // 获取上下文中的 sqlSessionTemplate对象
        BaseMapper<Out> mapper = SpringContextHolder.getBean(mapperClass);
        // 查询数据库并封装
        List<Out> outs = mapper.selectList(ew);
        if(CollectionUtils.isNotEmpty(outs)) {
            for (Out out : outs) {
                // 封装到数据源中
                ctx.collect(out);
            }
        }

    }

    @Override
    public void cancel() {

    }
}
